Flink 您所在的位置:网站首页 flink sql jar Flink

Flink

2023-09-24 09:17| 来源: 网络整理| 查看: 265

目录

1.sql何执行?

2.用户udf jar包如何被flink客户端加载

3.flink jm 与tm如何加载udf jar

1.sql何执行?

模型管理平台:编写sql的平台

开发一个公共执行flink-sql的模型,在main函数中初始化flink-tableEnv,调用模型管理平台的接口,获取用于flink job的sql,然后调用api执行sql。

2.用户udf jar包如何被flink客户端加载

        在编写sql页面,上传udf包,然后输入好udf参数,函数名,模型管理平台后台将jar包保存在服务器上。在提交任务时候,将这些jar包添加到 HADOOP_CLASSPATH类路径下。因为HADOOP_CLASS环境变量会被 flink脚本加载。在启动flink客户端时,会把HAOOP_CLASSPATH下的包与flink下lib下的包,一起通过-classpath参数加入到flink客户端的类路径下。

flink脚本

#!/usr/bin/env bash ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ target="$0" # For the case, the executable has been directly symlinked, figure out # the correct bin path by following its symlink up to an upper bound. # Note: we can't use the readlink utility here if we want to be POSIX # compatible. iteration=0 while [ -L "$target" ]; do if [ "$iteration" -gt 100 ]; then echo "Cannot resolve path: You have a cyclic symlink in $target." break fi ls=`ls -ld -- "$target"` target=`expr "$ls" : '.* -> \(.*\)$'` iteration=$((iteration + 1)) done # Convert relative path to absolute path bin=`dirname "$target"` # get flink config . "$bin"/config.sh if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi CC_CLASSPATH=`constructFlinkClassPath` log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml) echo "$JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"" # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems #注释翻译:添加 HADOOP_CLASSPATH 以允许使用 Hadoop 文件系统 exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@" flink客户端运行脚本 从脚本可以看到通过jvm的-classpath 将flinklib包与hadoopclasspath包一起加载进jvm了。 java -Dlog.file=/data/rsd/flink-1.10.0/log/flink-root-client-master.log -Dlog4j.configuration=file:/data/rsd/flink-1.10.0/conf/log4j-cli.properties -Dlogback.configurationFile=file:/data/rsd/flink-1.10.0/conf/logback.xml -classpath /data/rsd/flink-1.10.0/lib/flink-csv-1.10.0-sql-jar.jar :/data/rsd/flink-1.10.0/lib/flink-hbase_2.11-1.10.0.jar: /data/rsd/flink-1.10.0/lib/flink-jdbc_2.11-1.10.0.jar: /data/rsd/flink-1.10.0/lib/flink-json-1.10.0-sql-jar.jar: /data/rsd/flink-1.10.0/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar: /data/rsd/flink-1.10.0/lib/flink-statebackend-rocksdb_2.11-1.10.0.jar: /data/rsd/flink-1.10.0/lib/flink-table_2.11-1.10.0.jar: /data/rsd/flink-1.10.0/lib/flink-table-blink_2.11-1.10.0.jar: /data/rsd/flink-1.10.0/lib/hbase-client-2.1.0.jar: /data/rsd/flink-1.10.0/lib/hbase-common-2.1.0.jar: /data/rsd/flink-1.10.0/lib/kafka-clients-2.2.1.jar: /data/rsd/flink-1.10.0/lib/log4j-1.2.17.jar: /data/rsd/flink-1.10.0/lib/protobuf-java-2.4.0a.jar: /data/rsd/flink-1.10.0/lib/slf4j-log4j12-1.7.15.jar: /data/rsd/flink-1.10.0/lib/spark-model-jar-with-dependencies.jar: /data/rsd/flink-1.10.0/lib/flink-dist_2.11-1.10.0.jar: /etc/hadoop/conf: /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/ ../../hadoop/lib/*: /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/ ../../hadoop/.//*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-hdfs/./:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/libexec/../../hadoop-yarn/.//*:/jars/ RsdParams-1.0-jar-with-dependencies.jar:/etc/hadoop/conf::/etc/hbase/conf org.apache.flink.client.cli.CliFrontend

3.flink jm 与tm如何加载udf jar

通过 --yarnship 加载。

--yarnship会把,yarnship下的文件上传到hdfs上。然后jobmanager与taskmanger会下载jars到本地,然后再启动脚本中也会添加到tm与jm的类路径下。这样也可以加载udfjar包了。

/data/rsd/flink-1.10.0/bin/flink run -m yarn-cluster --yarnship /jars -d -ynm test -p 1 -ys 1 -ytm 1024 -yjm 1024 -c com.rongan.realtime.test.MaliciousFlowMicrostepWaring /jars2/RsdRealtime-1.0-jar-with-dependencies.jar --modelName MaliciousFlowMicrostepWaring -reqUrl http://192.168.7.202:8080/modelx/rsd/params/

配置截图

 

Flink自动上传到jar到hdfs查看

hadoop fs -ls /user/kafka/.flink/application_1627970076203_3335

可以看到下图 /jars被上传到hdfs了。

 

jm下载jars的查看

 

进入slave1的nodemanger的数据目录。 cd /data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3335/container_e108_1627970076203_3335_01_000003

可以看到下图jars被拷贝下来

该目录存储了整个容器的环境,包括配置文件,keytab文件,依赖。

(base) [root@slave1 container_e108_1627970076203_3335_01_000003]# ll total 52 -rw-------. 1 kafka yarn 151 Aug 5 13:50 container_tokens lrwxrwxrwx. 1 kafka yarn 152 Aug 5 13:50 flink-conf.yaml -> /data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3335/filecache/22/application_1627970076203_3335-flink-conf.yaml2696283394604308718.tmp lrwxrwxrwx. 1 kafka yarn 110 Aug 5 13:50 flink-dist_2.11-1.10.0.jar -> /data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3335/filecache/31/flink-dist_2.11-1.10.0.jar drwxr-s---. 2 kafka yarn 4096 Aug 5 13:50 jars lrwxrwxrwx. 1 kafka yarn 96 Aug 5 13:50 krb5.keytab -> /data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3335/filecache/17/kafka.keytab -rwx------. 1 kafka yarn 9278 Aug 5 13:50 launch_container.sh drwxr-s---. 2 kafka yarn 4096 Aug 5 13:50 lib lrwxrwxrwx. 1 kafka yarn 100 Aug 5 13:50 log4j.properties -> /data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3335/filecache/21/log4j.properties drwxr-s---. 2 kafka yarn 4096 Aug 5 13:50 plugins lrwxrwxrwx. 1 kafka yarn 124 Aug 5 13:50 RsdRealtime-1.0-jar-with-dependencies.jar -> /data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3335/filecache/32/RsdRealtime-1.0-jar-with-dependencies.jar drwxr-s---. 2 kafka yarn 4096 Aug 5 13:50 tmp 查看上个目录的启动launch_container.sh容器启动脚本

查看下面脚本CLASSPATH可以看到设置了我们配置的jars目录中的jar包 。

#!/bin/bash set -o pipefail -e export PRELAUNCH_OUT="/data/yarn/container-logs/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001/prelaunch.out" exec >"${PRELAUNCH_OUT}" export PRELAUNCH_ERR="/data/yarn/container-logs/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001/prelaunch.err" exec 2>"${PRELAUNCH_ERR}" echo "Setting up env variables" export HADOOP_CONF_DIR="/var/run/cloudera-scm-agent/process/5168-yarn-NODEMANAGER" export JAVA_HOME="/usr/java/jdk1.8.0_181-amd64" export _CLIENT_HOME_DIR="hdfs://rongan/user/kafka" export APP_SUBMIT_TIME_ENV="1628146565968" export NM_HOST="slave2" export _APP_ID="application_1627970076203_3409" export HADOOP_USER_NAME="[email protected]" export HADOOP_HDFS_HOME="/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop-hdfs" export LOGNAME="kafka" export JVM_PID="$$" export HADOOP_MAPRED_HOME="/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop-mapreduce" export PWD="/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001" export HADOOP_COMMON_HOME="/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop" export LOCAL_DIRS="/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409,/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409" export APPLICATION_WEB_PROXY_BASE="/proxy/application_1627970076203_3409" export NM_HTTP_PORT="8042" export HADOOP_CLIENT_CONF_DIR="/etc/hadoop/conf.cloudera.yarn" export LOG_DIRS="/data/yarn/container-logs/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001" export NM_AUX_SERVICE_mapreduce_shuffle="AAA0+gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=" export HADOOP_YARN_HOME="/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop-yarn" export CLASSPATH=":RsdRealtime-1.0-jar-with-dependencies.jar:jars/RsdParams-1.0-jar-with-dependencies.jar:jars/jaxb-core-2.3.0.jar:jars/spark-model-jar-with-dependencies.jar:lib/flink-csv-1.10.0-sql-jar.jar:lib/flink-hbase_2.11-1.10.0.jar:lib/flink-jdbc_2.11-1.10.0.jar:lib/flink-json-1.10.0-sql-jar.jar:lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:lib/flink-statebackend-rocksdb_2.11-1.10.0.jar:lib/flink-table-blink_2.11-1.10.0.jar:lib/flink-table_2.11-1.10.0.jar:lib/hbase-client-2.1.0.jar:lib/hbase-common-2.1.0.jar:lib/kafka-clients-2.2.1.jar:lib/log4j-1.2.17.jar:lib/protobuf-java-2.4.0a.jar:lib/slf4j-log4j12-1.7.15.jar:lib/spark-model-jar-with-dependencies.jar:flink-dist_2.11-1.10.0.jar:flink-conf.yaml:job.graph::$HADOOP_CLIENT_CONF_DIR:$HADOOP_COMMON_HOME/*:$HADOOP_COMMON_HOME/lib/*:$HADOOP_HDFS_HOME/*:$HADOOP_HDFS_HOME/lib/*:$HADOOP_YARN_HOME/*:$HADOOP_YARN_HOME/lib/*" export _KEYTAB_PRINCIPAL="[email protected]" export _FLINK_YARN_FILES="hdfs://rongan/user/kafka/.flink/application_1627970076203_3409" export _ZOOKEEPER_NAMESPACE="application_1627970076203_3409" export HADOOP_TOKEN_FILE_LOCATION="/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001/container_tokens" export _KEYTAB_PATH="hdfs://rongan/user/kafka/.flink/application_1627970076203_3409/kafka.keytab" export NM_AUX_SERVICE_spark_shuffle="" export _FLINK_JAR_PATH="hdfs://rongan/user/kafka/.flink/application_1627970076203_3409/flink-dist_2.11-1.10.0.jar" export LOCAL_USER_DIRS="/data/yarn/nm/usercache/kafka/,/data/yarn/nm2/usercache/kafka/" export _FLINK_CLASSPATH=":RsdRealtime-1.0-jar-with-dependencies.jar:jars/RsdParams-1.0-jar-with-dependencies.jar:jars/jaxb-core-2.3.0.jar:jars/spark-model-jar-with-dependencies.jar:lib/flink-csv-1.10.0-sql-jar.jar:lib/flink-hbase_2.11-1.10.0.jar:lib/flink-jdbc_2.11-1.10.0.jar:lib/flink-json-1.10.0-sql-jar.jar:lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:lib/flink-statebackend-rocksdb_2.11-1.10.0.jar:lib/flink-table-blink_2.11-1.10.0.jar:lib/flink-table_2.11-1.10.0.jar:lib/hbase-client-2.1.0.jar:lib/hbase-common-2.1.0.jar:lib/kafka-clients-2.2.1.jar:lib/log4j-1.2.17.jar:lib/protobuf-java-2.4.0a.jar:lib/slf4j-log4j12-1.7.15.jar:lib/spark-model-jar-with-dependencies.jar:flink-dist_2.11-1.10.0.jar:flink-conf.yaml:job.graph:" export HOME="/home/" export CONTAINER_ID="container_e108_1627970076203_3409_01_000001" export MALLOC_ARENA_MAX="4" echo "Setting up job resources" mkdir -p lib ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/30/flink-csv-1.10.0-sql-jar.jar" "lib/flink-csv-1.10.0-sql-jar.jar" mkdir -p jars ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/14/jaxb-core-2.3.0.jar" "jars/jaxb-core-2.3.0.jar" mkdir -p plugins ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/26/README.txt" "plugins/README.txt" mkdir -p lib ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/20/flink-json-1.10.0-sql-jar.jar" "lib/flink-json-1.10.0-sql-jar.jar" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/11/flink-jdbc_2.11-1.10.0.jar" "lib/flink-jdbc_2.11-1.10.0.jar" ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/17/kafka.keytab" "krb5.keytab" ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/22/application_1627970076203_3409-flink-conf.yaml4617629001914652036.tmp" "flink-conf.yaml" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/19/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar" "lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar" mkdir -p lib ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/34/kafka-clients-2.2.1.jar" "lib/kafka-clients-2.2.1.jar" mkdir -p lib ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/16/hbase-common-2.1.0.jar" "lib/hbase-common-2.1.0.jar" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/27/flink-table-blink_2.11-1.10.0.jar" "lib/flink-table-blink_2.11-1.10.0.jar" ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/24/application_1627970076203_34096881925228060113795.tmp" "job.graph" ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/31/flink-dist_2.11-1.10.0.jar" "flink-dist_2.11-1.10.0.jar" mkdir -p lib ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/12/flink-table_2.11-1.10.0.jar" "lib/flink-table_2.11-1.10.0.jar" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/29/protobuf-java-2.4.0a.jar" "lib/protobuf-java-2.4.0a.jar" mkdir -p jars ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/18/RsdParams-1.0-jar-with-dependencies.jar" "jars/RsdParams-1.0-jar-with-dependencies.jar" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/23/flink-hbase_2.11-1.10.0.jar" "lib/flink-hbase_2.11-1.10.0.jar" mkdir -p jars ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/13/spark-model-jar-with-dependencies.jar" "jars/spark-model-jar-with-dependencies.jar" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/25/spark-model-jar-with-dependencies.jar" "lib/spark-model-jar-with-dependencies.jar" mkdir -p lib ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/28/flink-statebackend-rocksdb_2.11-1.10.0.jar" "lib/flink-statebackend-rocksdb_2.11-1.10.0.jar" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/33/hbase-client-2.1.0.jar" "lib/hbase-client-2.1.0.jar" mkdir -p lib ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/15/log4j-1.2.17.jar" "lib/log4j-1.2.17.jar" ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/32/RsdRealtime-1.0-jar-with-dependencies.jar" "RsdRealtime-1.0-jar-with-dependencies.jar" ln -sf "/data/yarn/nm/usercache/kafka/appcache/application_1627970076203_3409/filecache/21/log4j.properties" "log4j.properties" mkdir -p lib ln -sf "/data/yarn/nm2/usercache/kafka/appcache/application_1627970076203_3409/filecache/10/slf4j-log4j12-1.7.15.jar" "lib/slf4j-log4j12-1.7.15.jar" echo "Launching container" exec /bin/bash -c "$JAVA_HOME/bin/java -Xms424m -Xmx424m -XX:MaxMetaspaceSize=256M -Dlog.file="/data/yarn/container-logs/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001/jobmanager.log" -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 1> /data/yarn/container-logs/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001/jobmanager.out 2> /data/yarn/container-logs/application_1627970076203_3409/container_e108_1627970076203_3409_01_000001/jobmanager.err"



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有